package com.chenjl.trace.stream.topology;

import java.util.Map;

import com.chenjl.trace.stream.util.Constant;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;
import lombok.extern.slf4j.Slf4j;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
/**
 * Bolt进行计算，统计单词出现的次数
 * 2018-8-16 20:02:12
 * @author chenjinlong
 */
@Slf4j
public class WordCountRichBolt implements IRichBolt {
	private static final long serialVersionUID = 1L;
	
	private JedisPool jedisPool = null;
	private String redisHost = "10.113.31.109";
	private Integer redisPort = 6379;
	
	private OutputCollector outputCollector ;
	
	
	@SuppressWarnings("rawtypes")
	@Override
	public void prepare(Map stormConf, TopologyContext topologyContext, OutputCollector outputCollector) {
		log.info("IRichBolt prepare~~");
		this.outputCollector = outputCollector;
		
		this.jedisPool = new JedisPool(redisHost,redisPort);
	}
	@Override
	public void execute(Tuple tuple) {
		String word = tuple.getStringByField(Constant.WORD_FIELD);
		log.info("WordCountRichBolt receive word : {}",word);
		
		Jedis jedis = jedisPool.getResource();
		jedis.lpush(Constant.WORD_FIELD,word);
		jedis.close();
		
		outputCollector.ack(tuple);
	}
	@Override
	public void cleanup() {
		log.info("WordCountRichBolt cleanup~");
	}
	@Override
	public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
		
	}
	@Override
	public Map<String, Object> getComponentConfiguration() {
		return null;
	}
}